package net.wicp.tams.common.flink.source.binlog;

import java.util.ArrayList;
import java.util.List;

import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

import lombok.extern.slf4j.Slf4j;
import net.wicp.tams.common.binlog.alone.BusiAssit;
import net.wicp.tams.common.binlog.alone.ListenerConf.CheckPoint;
import net.wicp.tams.common.binlog.alone.ListenerConf.ColHis;
import net.wicp.tams.common.binlog.alone.ListenerConf.ConnConf;
import net.wicp.tams.common.binlog.alone.ListenerConf.ConnConf.Builder;
import net.wicp.tams.common.binlog.alone.ListenerConf.Position;
import net.wicp.tams.common.binlog.alone.parser.ParseLogOnline;
import net.wicp.tams.duckula.client.Protobuf3.DuckulaEvent;

@Slf4j
public class BinlogSource extends RichSourceFunction<DuckulaEvent> implements ListCheckpointed<CheckPoint> {

	private static final long serialVersionUID = 1L;

	private ConnConf connConf;
	private ParseLogOnline logFetcher;

	private static final String listener = "net.wicp.tams.common.flink.source.binlog.FlinkBinlogListener";
	
	private static final String chk = "net.wicp.tams.common.binlog.alone.checkpoint.CheckPointMemory";

	public BinlogSource(ConnConf.Builder connConfBuilder) {
		connConfBuilder.setListener(listener);
		// common.binlog.alone.conf.default.chk
		connConfBuilder.setChk(chk);
		log.info("====设置chk:{}", chk);
		this.connConf = connConfBuilder.build();
	}

	/***
	 * 需要在配置文件中存在相关的配置
	 * 
	 * @param configKey 监听配置key
	 */
	public BinlogSource(String configKey) {
		ConnConf.Builder connConfBuilder = BusiAssit.configMap(configKey);
		connConfBuilder.setListener(listener);
		connConfBuilder.setChk(chk);
		log.info("====设置chk:{}", chk);
		this.connConf = connConfBuilder.build();
	}

	public BinlogSource() {
		this("default");
	}

	@Override
	public void setRuntimeContext(RuntimeContext t) {
		super.setRuntimeContext(t);
	}

	@Override
	public void open(Configuration parameters) throws Exception {
		super.open(parameters);
		getRuntimeContext().getUserCodeClassLoader().loadClass(listener);
		/*
		 * getRuntimeContext().getUserCodeClassLoader().loadClass("org.h2.Driver");
		 * getRuntimeContext().getUserCodeClassLoader().loadClass(
		 * "org.h2.mvstore.db.TransactionStore");
		 * getRuntimeContext().getUserCodeClassLoader().loadClass(
		 * "org.h2.mvstore.db.TransactionStore$1");
		 */
		//
		getRuntimeContext().getUserCodeClassLoader()
				.loadClass("net.wicp.tams.common.binlog.alone.checkpoint.CheckPointMemory");
		this.logFetcher = new ParseLogOnline(connConf.toBuilder());
	}

	@Override
	public List<CheckPoint> snapshotState(long checkpointId, long timestamp) throws Exception {
		List<CheckPoint> listState = new ArrayList<>();
		CheckPoint checkPoint = logFetcher.getCheckPointCur();// .getCheckPoint(timestamp);
		listState.add(checkPoint);
		return listState;
	}

	private List<ColHis> colsList = null;

	@Override
	public void restoreState(List<CheckPoint> state) throws Exception {
		Builder newBuilder = connConf.toBuilder();
		CheckPoint checkPoint = state.get(0);
		Position pos = checkPoint.getPos();
		log.info("the binlog begin from:{}", pos.getGtids());
		newBuilder.setPos(pos);
		this.connConf = newBuilder.build();
		this.colsList = checkPoint.getColsList();
	}

	@Override
	public void run(SourceContext<DuckulaEvent> ctx) throws Exception {
		FlinkBinlogListener binlogListener = (FlinkBinlogListener) this.logFetcher.getBinlogListener();
		binlogListener.setCtx(ctx);
		this.logFetcher.setColHis(this.colsList);
		this.logFetcher.read();
	}

	@Override
	public void cancel() {
		if (this.logFetcher != null) {
			log.info("============cancel the logFetcher");
			this.logFetcher.close();
		}
	}

}
